{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# DataFrame object\n",
    "---"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create SparkContext and SparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "# create entry points to spark\n",
    "try:\n",
    "    sc.stop()\n",
    "except:\n",
    "    pass\n",
    "from pyspark import SparkContext, SparkConf\n",
    "from pyspark.sql import SparkSession\n",
    "sc=SparkContext()\n",
    "spark = SparkSession(sparkContext=sc)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a DataFrame object"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Creat DataFrame by reading a file"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+\n",
      "|_c0              |mpg |cyl|disp |hp |drat|wt   |qsec |vs |am |gear|carb|\n",
      "+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+\n",
      "|Mazda RX4        |21.0|6  |160.0|110|3.9 |2.62 |16.46|0  |1  |4   |4   |\n",
      "|Mazda RX4 Wag    |21.0|6  |160.0|110|3.9 |2.875|17.02|0  |1  |4   |4   |\n",
      "|Datsun 710       |22.8|4  |108.0|93 |3.85|2.32 |18.61|1  |1  |4   |1   |\n",
      "|Hornet 4 Drive   |21.4|6  |258.0|110|3.08|3.215|19.44|1  |0  |3   |1   |\n",
      "|Hornet Sportabout|18.7|8  |360.0|175|3.15|3.44 |17.02|0  |0  |3   |2   |\n",
      "+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "mtcars = spark.read.csv(path='../../data/mtcars.csv',\n",
    "                        sep=',',\n",
    "                        encoding='UTF-8',\n",
    "                        comment=None,\n",
    "                        header=True, \n",
    "                        inferSchema=True)\n",
    "mtcars.show(n=5, truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create DataFrame with `createDataFrame` function"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### From an RDD\n",
    "\n",
    "Elements in RDD has to be an Row object"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(x=[1, 2, 3], y=['a', 'b', 'c']), Row(x=[4, 5, 6], y=['e', 'f', 'g'])]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyspark.sql import Row\n",
    "rdd = sc.parallelize([\n",
    "    Row(x=[1,2,3], y=['a','b','c']),\n",
    "    Row(x=[4,5,6], y=['e','f','g'])\n",
    "])\n",
    "rdd.collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+---------+\n",
      "|        x|        y|\n",
      "+---------+---------+\n",
      "|[1, 2, 3]|[a, b, c]|\n",
      "|[4, 5, 6]|[e, f, g]|\n",
      "+---------+---------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df = spark.createDataFrame(rdd)\n",
    "df.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### From pandas DataFrame"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>[1, 2, 3]</td>\n",
       "      <td>[a, b, c]</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>[4, 5, 6]</td>\n",
       "      <td>[e, f, g]</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "           x          y\n",
       "0  [1, 2, 3]  [a, b, c]\n",
       "1  [4, 5, 6]  [e, f, g]"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import pandas as pd\n",
    "pdf = pd.DataFrame({\n",
    "    'x': [[1,2,3], [4,5,6]],\n",
    "    'y': [['a','b','c'], ['e','f','g']]\n",
    "})\n",
    "pdf"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+---------+\n",
      "|        x|        y|\n",
      "+---------+---------+\n",
      "|[1, 2, 3]|[a, b, c]|\n",
      "|[4, 5, 6]|[e, f, g]|\n",
      "+---------+---------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df = spark.createDataFrame(pdf)\n",
    "df.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### From a list\n",
    "\n",
    "Each element in the list becomes an Row in the DataFrame. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------+------+\n",
      "|letter|number|\n",
      "+------+------+\n",
      "|     a|     1|\n",
      "|     b|     2|\n",
      "+------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "my_list = [['a', 1], ['b', 2]]\n",
    "df = spark.createDataFrame(my_list, ['letter', 'number'])\n",
    "df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('letter', 'string'), ('number', 'bigint')]"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.dtypes"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+---+\n",
      "|my_column| _2|\n",
      "+---------+---+\n",
      "|        a|  1|\n",
      "|        b|  2|\n",
      "+---------+---+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "my_list = [['a', 1], ['b', 2]]\n",
    "df = spark.createDataFrame(my_list, ['my_column'])\n",
    "df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('my_column', 'string'), ('_2', 'bigint')]"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.dtypes"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The following code generates a DataFrame consisting of two columns, each column is a vector column.\n",
    "\n",
    "Why vector columns are generated in this case?\n",
    "In this case, the list **my_list** has only one element, a tuple. Therefore, the DataFrame has only one row. This tuple has two elements. Therefore, it generates a two-columns DataFrame. Each element in the tuple is a list, so the resulting columns are vector columns."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------+------+\n",
      "|     x|     y|\n",
      "+------+------+\n",
      "|[a, 1]|[b, 2]|\n",
      "+------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "my_list = [(['a', 1], ['b', 2])]\n",
    "df = spark.createDataFrame(my_list, ['x', 'y'])\n",
    "df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "\n",
    "\n",
    "## Column instance\n",
    "\n",
    "Column instances can be created in two ways:\n",
    "\n",
    "1. directly select a column out of a *DataFrame*: `df.colName`\n",
    "2. create from a column expression: `df.colName + 1`\n",
    "\n",
    "Technically, there is only one way to create a column instance. Column expressions start from a column instance.\n",
    "\n",
    "**Remember how to create column instances, because this is usually the starting point if we want to operate DataFrame columns.**\n",
    "\n",
    "The column classes come with some methods that can operate on a column instance. ***However, almost all functions from the `pyspark.sql.functions` module take one or more column instances as argument(s)***. These functions are important for data manipulation tools.\n",
    "\n",
    "## DataFrame column methods\n",
    "\n",
    "### Methods that take column names as arguments:\n",
    "\n",
    "* `corr(col1, col2)`: two column names.\n",
    "* `cov(col1, col2)`: two column names.\n",
    "* `crosstab(col1, col2)`: two column names.\n",
    "* `describe(*cols)`: ***`*cols` refers to only column names (strings).***\n",
    "\n",
    "### Methods that take column names or column expressions or **both** as arguments:\n",
    "\n",
    "* `cube(*cols)`: column names (string) or column expressions or **both**.\n",
    "* `drop(*cols)`: ***a list of column names OR a single column expression.***\n",
    "* `groupBy(*cols)`: column name (string) or column expression or **both**.\n",
    "* `rollup(*cols)`: column name (string) or column expression or **both**.\n",
    "* `select(*cols)`: column name (string) or column expression or **both**.\n",
    "* `sort(*cols, **kwargs)`: column name (string) or column expression or **both**.\n",
    "* `sortWithinPartitions(*cols, **kwargs)`: column name (string) or column expression or **both**.\n",
    "* `orderBy(*cols, **kwargs)`: column name (string) or column expression or **both**.\n",
    "* `sampleBy(col, fractions, sed=None)`: a column name.\n",
    "* `toDF(*cols)`: **a list of column names (string).**\n",
    "* `withColumn(colName, col)`: `colName` refers to column name; `col` refers to a column expression.\n",
    "* `withColumnRenamed(existing, new)`: takes column names as arguments.\n",
    "* `filter(condition)`: ***condition** refers to a column expression that returns `types.BooleanType` of values. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
